package com.sshine.solon.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.sshine.solon.rabbitmq.core.Queue;
import com.sshine.solon.rabbitmq.core.*;
import com.sshine.solon.rabbitmq.impl.RabbitChannelFactory;
import com.sshine.solon.rabbitmq.impl.RabbitConsumeHandler;
import com.sshine.solon.rabbitmq.service.*;
import com.sshine.solon.rabbitmq.utils.Assert;
import com.sshine.solon.rabbitmq.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

/**
 * @author noear
 * @since 1.2
 */
public class RabbitAdmin {
    private static Logger logger = LoggerFactory.getLogger(RabbitAdmin.class);


    /**
     * The default exchange name.
     */
    public static final String DEFAULT_EXCHANGE_NAME = "";

    /**
     * Property key for the queue name in the {@link Properties} returned by
     */
    public static final String RETRY_TIMES = "RETRY_TIMES";


    private static final String DELAYED_MESSAGE_EXCHANGE = "x-delayed-message";

    //交换机容器
    private static final Map<String,Exchange> EXCHANGE_CONTAINER = new ConcurrentHashMap<>();
    //队列容器
    private static final Map<String,Queue> QUEUE_CONTAINER = new ConcurrentHashMap<>();
    //绑定容器
    private static final Map<String,Binding> BINDING_CONTAINER = new ConcurrentHashMap<>();

    private static final Map<RabbitMessageHandler, List<String>> HANDLER_CONTAINER = new ConcurrentHashMap<>();

    private RabbitChannelFactory channelFactory;

    private List<MessagePostProcessor> beforePublishPostProcessors;

    private String encoding = "UTF-8";

    private MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();

    private MessageConvert messageConvert = new SimpleMessageConverter();

    public RabbitAdmin() {
        channelFactory = new RabbitChannelFactory();
    }

    public boolean convertAndSend(String exchange, String routingKey, final Object object) throws Exception {
        return convertAndSend(exchange, routingKey, convertMessageIfNecessary(object), null);
    }

    public boolean convertAndSend(String exchange, String routingKey, final Object object, Date scheduled) throws Exception {
        long ttl = 0;
        if (scheduled != null) {
            ttl = scheduled.getTime() - System.currentTimeMillis();
        }
        return convertAndSend(exchange, routingKey, convertMessageIfNecessary(object), ttl);
    }

    public boolean convertAndSend(String exchange, String routingKey, final Object object, long ttl) throws Exception {
        Channel channel = channelFactory.getChannel();
        return send(channel, exchange, routingKey, convertMessageIfNecessary(object), ttl);
    }

    private Message convertMessageIfNecessary(final Object object) throws UnsupportedEncodingException {
        if (object instanceof Message) {
            return (Message) object;
        }
        return this.messageConvert.toMessage(object,new MessageProperties());
    }

    /**
     * Send the given message to the specified exchange.
     *
     * @param channel       The RabbitMQ Channel to operate within.
     * @param exchangeArg   The name of the RabbitMQ exchange to send to.
     * @param routingKeyArg The routing key.
     * @param message       The Message to send.
     * @throws IOException If thrown by RabbitMQ API methods.
     */
    public boolean send(Channel channel, String exchangeArg, String routingKeyArg, Message message, long ttl) throws IOException, TimeoutException, InterruptedException {
        try {
            Assert.notEmpty(exchangeArg, "exchangeArg can't be null");
            if (logger.isTraceEnabled()) {
                logger.trace("Original message to publish: " + message);
            }

            Message messageToUse = message;
            MessageProperties messageProperties = message.getMessageProperties();
            if (this.beforePublishPostProcessors != null) {
                for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
                    messageToUse = processor.postProcessMessage(messageToUse);
                }
            }
            if (ttl > 0) {
                messageProperties.setExpiration(String.valueOf(ttl));
            }
            messageProperties.setHeader(RETRY_TIMES,0);
            if (logger.isDebugEnabled()) {
                logger.debug("Publishing message [" + messageToUse
                        + "] on exchange [" + exchangeArg + "], routingKey = [" + routingKeyArg + "]");
            }
            return sendToRabbit(channel, exchangeArg, routingKeyArg, messageToUse);
        } finally {
            channelFactory.closeChannel();
        }
    }

    protected boolean sendToRabbit(Channel channel, String exchange, String routingKey,
                                Message message) throws IOException, TimeoutException, InterruptedException {
        AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter
                .fromMessageProperties(message.getMessageProperties(), this.encoding);
        channel.basicPublish(exchange, routingKey, convertedMessageProperties, message.getBody());
        long timeout = RabbitmqProps.getRabbitmqTimeout();
        if (timeout > 0) {
            return channel.waitForConfirms(timeout);
        }
        return true;
    }

    protected void init() {
        for (String name : EXCHANGE_CONTAINER.keySet()) {
            Exchange exchange = EXCHANGE_CONTAINER.get(name);
            if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
                this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
                        + exchange.getName()
                        + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
                        + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
                        + "reopening the connection.");
            }
        }
        for (String name : QUEUE_CONTAINER.keySet()) {
            Queue queue = QUEUE_CONTAINER.get(name);
            if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
                this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
                        + queue.getName()
                        + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
                        + queue.isExclusive() + ". "
                        + "It will be redeclared if the broker stops and is restarted while the connection factory is "
                        + "alive, but all messages will be lost.");
            }
        }

        if (EXCHANGE_CONTAINER.size() == 0 && QUEUE_CONTAINER.size() == 0 && BINDING_CONTAINER.size() == 0) {
            this.logger.debug("Nothing to declare");
            return;
        }
        Channel channel = channelFactory.getChannel();
        try {
            declareExchanges(channel, EXCHANGE_CONTAINER.values().toArray(new Exchange[EXCHANGE_CONTAINER.size()]));
            declareQueues(channel, QUEUE_CONTAINER.values().toArray(new Queue[QUEUE_CONTAINER.size()]));
            declareBindings(channel, BINDING_CONTAINER.values().toArray(new Binding[BINDING_CONTAINER.size()]));
        } catch (IOException e) {
            throw new RabbitmqException("Declare exception", e);
        }
        channelFactory.closeChannel();
        attentionQueues(HANDLER_CONTAINER);
        this.logger.debug("Declarations finished");
    }

    public void declareExchanges(final Channel channel, final Exchange... exchanges) throws IOException {
        for (Exchange exchange : exchanges) {
            if (!isDeclaringDefaultExchange(exchange)) {
                try {
                    if (exchange.isDelayed()) {
                        Map<String, Object> arguments = exchange.getArguments();
                        if (arguments == null) {
                            arguments = new HashMap<String, Object>();
                        } else {
                            arguments = new HashMap<String, Object>(arguments);
                        }
                        arguments.put("x-delayed-type", exchange.getType().getType());
                        channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),
                                exchange.isAutoDelete(), exchange.isInternal(), arguments);
                    } else {
                        channel.exchangeDeclare(exchange.getName(), exchange.getType().getType(), exchange.isDurable(),
                                exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments());
                    }
                    long timeout = RabbitmqProps.getRabbitmqTimeout();
                    if (timeout > 0) {
                        channel.confirmSelect();
                    }
                } catch (IOException e) {
                    logOrRethrowDeclarationException(exchange, "exchange", e);
                }
            }
        }
    }

    private void declareQueues(final Channel channel, final Queue... queues) throws IOException {
        for (int i = 0; i < queues.length; i++) {
            Queue queue = queues[i];
            if (!queue.getName().startsWith("amq.")) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("declaring Queue '" + queue.getName() + "'");
                }
                try {
                    try {
                        AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
                                queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
                        if (StringUtils.hasText(declareOk.getQueue())) {
                            queue.setActualName(declareOk.getQueue());
                        }
                    } catch (IllegalArgumentException e) {
                        closeChannelAfterIllegalArg(channel, queue);
                        throw new IOException(e);
                    }
                } catch (IOException e) {
                    logOrRethrowDeclarationException(queue, "queue", e);
                }
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug(queue.getName() + ": Queue with name that starts with 'amq.' cannot be declared.");
            }
        }
    }

    private void declareBindings(final Channel channel, final Binding... bindings) throws IOException {
        for (Binding binding : bindings) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Binding destination [" + binding.getDestination() + " (" + binding.getDestinationType()
                        + ")] to exchange [" + binding.getExchange() + "] with routing key [" + binding.getRoutingKey()
                        + "]");
            }

            try {
                if (binding.isDestinationQueue()) {
                    if (!isDeclaringImplicitQueueBinding(binding)) {
                        channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
                                binding.getArguments());
                    }
                } else {
                    channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
                            binding.getArguments());
                }
            } catch (IOException e) {
                logOrRethrowDeclarationException(binding, "binding", e);
            }
        }
    }

    private void attentionQueues(final Map<RabbitMessageHandler, List<String>> handlers) {
        for (RabbitMessageHandler handler : handlers.keySet()) {
            for (String queue : handlers.get(handler)) {
                try {
                    Channel channel = channelFactory.getConnection().createChannel();
                    RabbitConsumeHandler consumeHandler = new RabbitConsumeHandler(channel, handler,this);
                    int prefetchCount = RabbitmqProps.getRabbitmqPrefetchCount();
                    if (prefetchCount < 1) {
                        prefetchCount = 10;
                    }

                    //申明同时接收数量
                    channel.basicQos(prefetchCount);
                    channel.basicConsume(queue, consumeHandler);
                } catch (IOException e) {
                    throw new RabbitmqException("Rabbitmq listener create fail", e);
                } catch (TimeoutException e) {
                    throw new RabbitmqException("Rabbitmq listener create fail", e);
                }
            }
        }
    }

    private boolean isDeclaringDefaultExchange(Exchange exchange) {
        if (isDefaultExchange(exchange.getName())) {
            this.logger.debug("Default exchange is pre-declared by server.");
            return true;
        }
        return false;
    }

    private <T extends Throwable> void logOrRethrowDeclarationException(Declarable element,
                                                                        String elementType, T t) throws T {
        if (element != null && element.isIgnoreDeclarationExceptions()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Failed to declare " + elementType
                        + ": " + (element == null ? "broker-generated" : element)
                        + ", continuing...", t);
            } else if (this.logger.isWarnEnabled()) {
                Throwable cause = t;
                if (t instanceof IOException && t.getCause() != null) {
                    cause = t.getCause();
                }
                this.logger.warn("Failed to declare " + elementType
                        + ": " + (element == null ? "broker-generated" : element)
                        + ", continuing... " + cause);
            }
        } else {
            throw t;
        }
    }

    private void closeChannelAfterIllegalArg(final Channel channel, Queue queue) {
        if (this.logger.isDebugEnabled()) {
            this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
        }
        try {
            channel.close();
        } catch (IOException | TimeoutException e1) {
            this.logger.error("Failed to close channel after illegal argument", e1);
        }
    }

    private boolean isDeclaringImplicitQueueBinding(Binding binding) {
        if (isImplicitQueueBinding(binding)) {
            this.logger.debug("The default exchange is implicitly bound to every queue," +
                    " with a routing key equal to the queue name.");
            return true;
        }
        return false;
    }

    private boolean isImplicitQueueBinding(Binding binding) {
        return isDefaultExchange(binding.getExchange()) && binding.getDestination().equals(binding.getRoutingKey());
    }

    private boolean isDefaultExchange(String exchangeName) {
        return DEFAULT_EXCHANGE_NAME.equals(exchangeName);
    }

    public static void addToContainer(Exchange exchange) {
        Exchange exist = EXCHANGE_CONTAINER.get(exchange.getName());
        if (exist == null) {
            EXCHANGE_CONTAINER.put(exchange.getName(),exchange);
        }
    }

    public static void addToContainer(Queue queue) {
        Queue exist = QUEUE_CONTAINER.get(queue.getName());
        if (exist == null) {
            QUEUE_CONTAINER.put(queue.getName(),queue);
        }
    }

    public static void addToContainer(Binding binding) {
        Binding exist = BINDING_CONTAINER.get(binding.getId());
        if (exist == null) {
            BINDING_CONTAINER.put(binding.getId(),binding);
        }
    }

    public static void addToContainer(RabbitMessageHandler handler, List<String> queues) {
        HANDLER_CONTAINER.put(handler, queues);
    }

    /**
     * Set {@link MessagePostProcessor}s that will be invoked immediately before invoking
     * {@code Channel#basicPublish()}, after all other processing, except creating the
     * {@link AMQP.BasicProperties} from {@link MessageProperties}. May be used for operations
     * such as compression. Processors are invoked in order, depending on {@code PriorityOrder},
     * {@code Order} and finally unordered.
     *
     * @param beforePublishPostProcessors the post processor.
     * @see #addBeforePublishPostProcessors(MessagePostProcessor...)
     * @since 1.4.2
     */
    public void setBeforePublishPostProcessors(MessagePostProcessor... beforePublishPostProcessors) {
        Assert.notNull(beforePublishPostProcessors, "'beforePublishPostProcessors' cannot be null");
        Assert.noNullElements(beforePublishPostProcessors, "'beforePublishPostProcessors' cannot have null elements");
        List<MessagePostProcessor> list = Arrays.asList(beforePublishPostProcessors);
        this.beforePublishPostProcessors = list;
        Collections.sort(this.beforePublishPostProcessors);
    }

    /**
     * Add {@link MessagePostProcessor} that will be invoked immediately before invoking
     * {@code Channel#basicPublish()}, after all other processing, except creating the
     * {@link AMQP.BasicProperties} from {@link MessageProperties}. May be used for operations
     * such as compression. Processors are invoked in order, depending on {@code PriorityOrder},
     * {@code Order} and finally unordered.
     * <p>
     * In contrast to {@link #setBeforePublishPostProcessors(MessagePostProcessor...)}, this
     * method does not override the previously added beforePublishPostProcessors.
     *
     * @param beforePublishPostProcessors the post processor.
     * @since 2.1.4
     */
    public void addBeforePublishPostProcessors(MessagePostProcessor... beforePublishPostProcessors) {
        Assert.notNull(beforePublishPostProcessors, "'beforePublishPostProcessors' cannot be null");
        if (this.beforePublishPostProcessors == null) {
            this.beforePublishPostProcessors = new ArrayList<>();
        }
        this.beforePublishPostProcessors.addAll(Arrays.asList(beforePublishPostProcessors));
        Collections.sort(this.beforePublishPostProcessors);
    }

    /**
     * Remove the provided {@link MessagePostProcessor} from the {@link #beforePublishPostProcessors} list.
     *
     * @param beforePublishPostProcessor the MessagePostProcessor to remove.
     * @return the boolean if the provided post processor has been removed.
     * @see #addBeforePublishPostProcessors(MessagePostProcessor...)
     * @since 2.1.4
     */
    public boolean removeBeforePublishPostProcessor(MessagePostProcessor beforePublishPostProcessor) {
        Assert.notNull(beforePublishPostProcessor, "'beforePublishPostProcessor' cannot be null");
        if (this.beforePublishPostProcessors != null) {
            return this.beforePublishPostProcessors.remove(beforePublishPostProcessor);
        }
        return false;
    }

    public String getEncoding() {
        return encoding;
    }

    public void setEncoding(String encoding) {
        this.encoding = encoding;
    }

    /**
     * Set the {@link MessagePropertiesConverter} for this template. This converter is used to convert between raw byte
     * content in the message headers and plain Java objects. In particular there are limitations when dealing with very
     * long string headers, which hopefully are rare in practice, but if you need to use long headers you might need to
     * inject a special converter here.
     *
     * @param messagePropertiesConverter The message properties converter.
     */
    public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter) {
        Assert.notNull(messagePropertiesConverter, "messagePropertiesConverter must not be null");
        this.messagePropertiesConverter = messagePropertiesConverter;
    }

    /**
     * Return the properties converter.
     *
     * @return the converter.
     * @since 2.0
     */
    public MessagePropertiesConverter getMessagePropertiesConverter() {
        return this.messagePropertiesConverter;
    }

    public void setMessageConvert(MessageConvert messageConvert) {
        this.messageConvert = messageConvert;
    }
}
